package com.atguigu.demo;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSQLLookupJoin {


    //1   创建流动态表
    //订单：
    //{"id":"101","user_id":"u_101","status":"1001", "amount":200,"ts":9900}

    //2   创建lookup表
    //3   进行lookup join
    public static void main(String[] args) {
        //0    环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

       // TableResult sql = tableEnv.executeSql("sql");//真正执行  行动算子  insert select
      //  Table sql1 = tableEnv.sqlQuery("sql");// 并不是执行  转换算子
        //1   创建流动态表
        //CREATE TABLE order_info (
        //  `id`    STRING,
        //  `user_id` STRING,
        //  `status` STRING,
        //  `amount` STRING,
        //  `ts` STRING,
        //  `proc_time` as proctime()
        //) WITH (
        //  'connector' = 'kafka',
        //  'topic' = 'demo_order_info',
        //  'properties.bootstrap.servers' = 'hadoop102:9092',
        //  'properties.group.id' = 'flink_lookupjoin_demo',
        //  'scan.startup.mode' = 'latest-offset',
        //  'format' = 'json'
        //);

        String createOrderInfoSQL="        CREATE TABLE order_info (\n" +
                "         `id`    STRING,\n" +
                "           `user_id` STRING,\n" +
                "          `status` STRING,\n" +
                "          `amount` STRING,\n" +
                "          `ts` STRING,\n" +
                "          `proc_time` as proctime()\n" +
                "        ) WITH (\n" +
                "          'connector' = 'kafka',\n" +
                "           'topic' = 'demo_order_info',\n" +
                "           'properties.bootstrap.servers' = 'hadoop102:9092',\n" +
                "           'properties.group.id' = 'flink_lookupjoin_demo',\n" +
                "           'scan.startup.mode' = 'latest-offset',\n" +
                "           'format' = 'json'\n" +
                "         )";

        tableEnv.executeSql(createOrderInfoSQL);

//        String selectOrderInfoSQL = "select * from order_info";
//        tableEnv.executeSql(selectOrderInfoSQL).print();
        // tableEnv.sqlQuery(selectOrderInfoSQL) .toString();

        //2   创建lookup表
        //CREATE TABLE base_dic (
        //  dic_code BIGINT,
        //  dic_name STRING,
        //  PRIMARY KEY (dic_code) NOT ENFORCED
        //) WITH (
        //   'connector' = 'jdbc',
        //   'url' = 'jdbc:mysql://hadoop102:3306/gmall',
        //   'table-name' = 'base_dic',
        //   'username' = 'root',
        //   'password' = '000000'
        //);

        String createLookupTableSQL="  CREATE TABLE base_dic (\n" +
                "          dic_code STRING,\n" +
                "          dic_name STRING,\n" +
                "          PRIMARY KEY (dic_code) NOT ENFORCED\n" +
                "        ) WITH (\n" +
                "           'connector' = 'jdbc',\n" +
                "           'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "           'url' = 'jdbc:mysql://hadoop102:3306/gmall',\n" +
                "           'table-name' = 'base_dic',\n" +
                "          'username' = 'root',\n" +
                "          'password' = '000000',\n" +
                "          'lookup.cache.max-rows' = '1000',\n" +
                "          'lookup.cache.ttl' = '60s'\n" +
                "         ) ";
        tableEnv.executeSql(createLookupTableSQL);
      //  String selectlookupTableSQL = "select * from base_dic";
      //  tableEnv.executeSql(selectlookupTableSQL).print();




        //3 定义目标表   订单id ,用户id ,订单状态（名称） ，订单金额，时间戳   kafka主题 demo_order_wide
        //CREATE TABLE pageviews_per_region (
        //  order_id STRING,
        //  user_id STRING,
        //  order_status STRING,
        //  order_amount STRING,
        //  ts STRING,
        //  PRIMARY KEY (order_id) NOT ENFORCED
        //) WITH (
        //  'connector' = 'upsert-kafka',
        //  'topic' = 'demo_order_wide',
        //  'properties.bootstrap.servers' = 'hadoop102:9092',
        //  'key.format' = 'json',
        //  'value.format' = 'json'
        //);

        String createDemoOrderWideSQL="        CREATE TABLE demo_order_wide (\n" +
                "           order_id STRING,\n" +
                "           user_id STRING,\n" +
                "           order_status STRING,\n" +
                "           order_amount STRING,\n" +
                "           ts STRING,\n" +
                "           PRIMARY KEY (order_id) NOT ENFORCED\n" +
                "         ) WITH (\n" +
                "          'connector' = 'upsert-kafka',\n" +
                "           'topic' = 'demo_order_wide',\n" +
                "           'properties.bootstrap.servers' = 'hadoop102:9092',\n" +
                "           'key.format' = 'json',\n" +
                "           'value.format' = 'json'\n" +
                "         ) ";

        tableEnv.executeSql(createDemoOrderWideSQL);


        //4   进行lookup join
        //SELECT o.*,d.dic_name
        //FROM order_info AS o
        //  JOIN base_dic   FOR SYSTEM_TIME AS OF o.proc_time AS d
        //    ON o.`status` = d.dic_code;
        String lookupJoinSQL="  insert into  demo_order_wide    " +
                "   SELECT o.id,o.user_id, d.dic_name,o.amount,o.ts  \n" +
                "        FROM order_info AS o\n" +
                "         JOIN base_dic   FOR SYSTEM_TIME AS OF o.proc_time AS d\n" +
                "           ON o.`status` = d.dic_code";
        tableEnv.executeSql(lookupJoinSQL) ;

    }
}
